package com.spring.boot.kafka.config;

import cn.hutool.json.JSONUtil;
import com.spring.boot.kafka.model.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

/**
 * @author liuzhiqiang
 */
@EnableKafkaStreams
@Configuration
@Slf4j
public class KafkaStreams {

    @Autowired
    private StreamsBuilder streamsBuilder;

    @Bean
    public KStream<String, String> kStream() {
        KStream<String, String> stream = streamsBuilder.stream("userTopic");
        // 将userTopic话题中大于10岁的放入aboveUserTopic
        stream.filter((k, v) -> {
            User user = JSONUtil.toBean(v, User.class);
            if (user.getAge() > 10) {
                log.info("大于10岁【{}】", v);
                return true;
            } else {
                log.info("小于10岁【{}】", v);
                return false;
            }
        }).to("aboveUserTopic", Produced.with(Serdes.String(), Serdes.String()));
        return stream;
    }
}
